/*###################################################################
  > File Name: storage/replica/diskmd/diskmd_pool.c
  > Author: Vurtune
  > Mail: vurtune@foxmail.com
  > Created Time: Wed 17 Jan 2018 07:42:18 PM PST
###################################################################*/
#include "config.h"

#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <linux/fs.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <libgen.h>
#include <ctype.h>
#include <fcntl.h>
#include <libaio.h>
#include <limits.h>
#include <errno.h>
#include <sys/vfs.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "nodectl.h"
#include "lich_api.h"
#include "disk.h"
#include "disk_slot.h"
#include "diskmd_pool.h"

typedef struct {
        struct list_head hook;
        int retval;
        sy_rwlock_t lock;
        uint64_t rand;
        int locs_count;
        locs_t *locs;
} allocate_request_t;

typedef enum {
        __STATUS_STARTING__ = 0,
        __STATUS_RUNNING__,
        __STATUS_STOP__,
        __STATUS_STOPPED__,
} worker_status_t;

typedef struct {
        struct {
                sem_t sem;
                disk_t *disk;
                pool_t *pool;
        } arg;
        worker_status_t status;
} disk_worker_t;

static void *disk_allocate_worker(void *arg);
static void disk_allocate_worker_exit(disk_worker_t *worker);

int create_pool(const char *name, pool_t **_pool)
{
        int ret;
        pool_t *pool;

        ret = ymalloc((void **)&pool, sizeof(*pool));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(pool, 0x0, sizeof(*pool));

        INIT_LIST_HEAD(&pool->allocator.list);

        ret = sy_rwlock_init(&pool->lock, "disk_manager.pool.lock");
        if(unlikely(ret))
                GOTO(err_free, ret);

        ret = sy_spin_init(&pool->allocator.lock);
        if(unlikely(ret))
                GOTO(err_free, ret);

        strcpy(pool->name, name);

        *_pool = pool;

        return 0;
err_free:
        yfree((void **)&pool);
err_ret:
        return ret;
}

void free_pool(pool_t **pool) {
        yfree((void **)pool);
}

void diskmd_pool_dfree_nolock(pool_t *pool, uint64_t *_pool_total, uint64_t *_pool_used, int print)
{
        int ret, i;
        disk_t *disk;

        *_pool_used = 0;
        *_pool_total = 0;

        for (i = 0; i < DISK_MAX; i++) {
                if (pool->disk_array[i]) {

                        ret = disk_slot_get(i, &disk);
                        if (unlikely(ret)) {
                                UNIMPLEMENTED(__DUMP__);
                        }

                        if (!disk_avaiable(disk)) {
                                disk_slot_release(i);
                                continue;
                        }

                        if (disk->cache == 100) {
                                disk_slot_release(i);
                                continue;
                        }

                        if (print) {
                                DINFO("pool %s disk %u total %u used %u\n",
                                                pool->name, i, disk->bmap.size,
                                                disk->bmap.nr_one);
                        }

                        *_pool_used += disk->bmap.nr_one;
                        *_pool_total += disk->bmap.size;

                        disk_slot_release(i);
                }
        }
}

int diskmd_pool_dfree1(pool_t *pool, uint64_t *_pool_total, uint64_t *_pool_used, int print)
{
        int ret;

        ret = sy_rwlock_rdlock(&pool->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        diskmd_pool_dfree_nolock(pool, _pool_total, _pool_used, print);

        sy_rwlock_unlock(&pool->lock);
        return 0;
err_ret:
        return ret;
}

int diskmd_pool_add_disk_nolock(pool_t* pool, disk_t *disk, int bad)
{
        int ret;
        disk_worker_t *worker;

        (void) bad;
        int idx = disk->idx;
        YASSERT(pool && pool->disk_array[idx] == 0);
        YASSERT(disk->disk_fd);

        ret = ymalloc((void **)&worker, sizeof(disk_worker_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        //TODO  是用指针的方式还是在线程内部 disk_slot_get?
        worker->arg.disk = disk;
        worker->arg.pool = pool;
        worker->status = __STATUS_STARTING__;
        ret = sem_init(&worker->arg.sem, 0, 0);
        if (unlikely(ret))
                GOTO(err_free, ret);

        disk->sem = &worker->arg.sem;
        ret = disk_slot_set(idx, disk, worker);
        if (unlikely(ret))
                GOTO(err_free, ret);

        DINFO("pool %s disk %d\n", pool->name, idx);

        pool->disk_array[idx] = 1;
        pool->disk_max = pool->disk_max < disk->idx ? idx : pool->disk_max;

        ret = sy_thread_create2(disk_allocate_worker, worker, "diskmd.allocator_worker");
        if (unlikely(ret)) {
                UNIMPLEMENTED(__DUMP__);
        }

        return 0;
err_free:
        yfree((void **)&worker);
err_ret:
        return ret;
}

int diskmd_pool_add_disk(pool_t* pool, disk_t *disk, int bad)
{
        int ret;

        ret = sy_rwlock_wrlock(&pool->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = diskmd_pool_add_disk_nolock(pool, disk, bad);
        if (unlikely(ret))
                GOTO(err_unlock, ret);

        sy_rwlock_unlock(&pool->lock);

        return 0;
err_unlock:
        sy_rwlock_unlock(&pool->lock);
err_ret:
        return ret;
}

int diskmd_pool_remove_disk_nolock(pool_t* pool, int idx, disk_t **disk)
{
        int ret;
        disk_worker_t *worker;

        YASSERT(idx >=0 && idx <= DISK_MAX);
        YASSERT(pool->disk_array[idx]);

        DINFO("pool %s disk %d\n", pool->name, idx);

        pool->disk_array[idx] = 0;

        ret = disk_slot_remove(idx, disk, (void **)&worker);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        if (worker)
                disk_allocate_worker_exit(worker);

        yfree((void **)&worker);

        return 0;
}

int diskmd_pool_remove_disk(const char *home, pool_t* pool, int idx)
{
        int ret;
        disk_t *disk;

        ret = sy_rwlock_wrlock(&pool->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        diskmd_pool_remove_disk_nolock(pool, idx, &disk);

        sy_rwlock_unlock(&pool->lock);

        if (disk != NULL) {
                disk_unload(disk, home, pool->name);
        }

        return 0;
err_ret:
        return ret;
}

int diskmd_pool_disks_get(pool_t *pool, int tier, int locs_count, disk_t **array, int *disk_count)
{
        int ret, idx, r, i, max;
        disk_t *disk;

        YASSERT(tier >= 0 && tier < DISK_TIER_MAX);

        *disk_count = 0;

        max = pool->disk_max + 1;
        r =  pool->disk_allocate_ptr[tier];

        for (i = 0; i < max; i++) {
                idx = (i + r) % max;

                if (pool->disk_array[idx]) {
                        ret = disk_slot_get(idx, &disk);
                        if (unlikely(ret)) {
                                UNIMPLEMENTED(__DUMP__);
                        }
                        if (!disk_avaiable(disk)) {
                                DBUG("disk %u not exist\n", idx);
                                disk_slot_release(idx);
                                continue;
                        }

                        if (disk->status & __DISK_DELETING__) {
                                disk_slot_release(idx);
                                continue;
                        }

                        if (disk->cache == 100) {
                                disk_slot_release(idx);
                                continue;
                        }

                        if (disk->bmap.size - disk->bmap.nr_one < locs_count + 1) {
                                disk_slot_release(idx);
                                continue;
                        }

                        if (disk->cache == 100) {
                                disk_slot_release(idx);
                                continue;
                        }
                        if (disk->tier != tier) {
                                DBUG("disk[%u] need %d got %d\n", i, tier, disk->tier);
                                disk_slot_release(idx);
                                continue;
                        }

                        array[(*disk_count)++] = disk;
                }
        }

        return 0;
}


void diskmd_pool_disks_release(pool_t *pool, disk_t **array, int disk_count)
{
        int i;
        disk_t *disk;

        (void) pool;
        for (i = 0; i < disk_count; i++) {
                disk = array[i];
                disk_slot_release(disk->idx);
        }
}

void disk_manager_pool_update_tier(pool_t *pool)
{
        int ret, min, max, i, disk_max;
        disk_t *disk;

        DINFO("disk max %u\n", pool->disk_max);

        max = 0;
        min = INT_MAX;
        disk_max = pool->disk_max + 1;
        for (i = 0; i < disk_max; i++) {
                if (pool->disk_array[i]) {
                        ret = disk_slot_get(i, &disk);
                        if (unlikely(ret)) {
                                YASSERT(0 && "why ?");
                        }

                        if (disk_avaiable(disk) && disk->tier != -1) {
                                max = disk->tier > max ? disk->tier : max;
                                min = disk->tier < min ? disk->tier : min;
                        }
                        disk_slot_release(i);
                }
        }


        for (i = 0; i < disk_max; i++) {
                if (pool->disk_array[i]) {
                        ret = disk_slot_get(i, &disk);
                        if (unlikely(ret)) {
                                YASSERT(0 && "why ?");
                        }

                        if (disk_avaiable(disk) && disk->tier == -1) {
                                DINFO("set disk[%u], tier %u\n", i, max);
                                disk->tier = max;
                        }
                        disk_slot_release(i);
                }
        }

        min = min != INT_MAX ? min : 0;
        pool->tier_min = min;
        pool->tier_max = max;

        YASSERT(min <= max);
}

typedef struct {
        co_fork_hook_t fork_hook;

        // params
        pool_t *pool;
        disk_t **array;
        int disk_count;
        int locs_count;
        int loc_idx;

        // result
        locs_t locs[0];
} diskmd_allocate_ctx_t;

static void __diskmd_delete(disk_t *disk, struct list_head *list)
{
        int ret, count = 0;
        struct list_head *pos, *n;
        delete_request_t *delete_request;

        ret = sy_rwlock_wrlock(&disk->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        list_for_each_safe(pos, n, list) {
                count++;
                delete_request = (void *) pos;
                list_del(&delete_request->hook);

                YASSERT(disk->idx == delete_request->loc.diskid);
                ret = bmap_del(&disk->bmap, delete_request->loc.idx);
                YASSERT(ret == 0);

                DBUG("disk[%u] delete %u\n", disk->idx, delete_request->loc.idx);

                yfree((void **)&pos);
        }

        if (count) {
                DBUG("disk[%u] delete %u\n", disk->idx, count);
        }

        if (gloconf.bmap_mem) {
                //need not sync
        } else {
                ret = msync(disk->bmap.bits, disk->map_size, MS_SYNC);
                if (ret)
                        UNIMPLEMENTED(__DUMP__);
        }

        sy_rwlock_unlock(&disk->lock);
}

static void *disk_allocate_worker(void *arg)
{
        int ret;
        struct list_head list;
        allocate_request_t *allocate_request;

        disk_worker_t *worker = arg;
        pool_t *pool = worker->arg.pool;
        disk_t *disk = worker->arg.disk;

        worker->status = __STATUS_RUNNING__;
        DINFO("pool %s disk %d allocator startup\n", pool->name, disk->idx);

        INIT_LIST_HEAD(&list);

        while (1) {
                allocate_request = NULL;
                ret = sem_wait(&worker->arg.sem);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                if (worker->status == __STATUS_STOP__) {
                        DWARN("disk %d allocator worker while exit\n", disk->idx);
                        break;
                }

                /*
                 * just check disk status is not enough.
                 * the right way is split allocat and delete operation if they use same sem.
                 */
                if (disk_allocable(disk)) {
                        ret = sy_spin_lock(&pool->allocator.lock);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        if (!list_empty(&pool->allocator.list)) {
                                allocate_request = (void *)pool->allocator.list.next;
                                list_del(&allocate_request->hook);
                        }

                        sy_spin_unlock(&pool->allocator.lock);

                        if (allocate_request) {
                                ret = disk_bmap_get_empty(disk, allocate_request->locs,
                                                allocate_request->locs_count);
                                allocate_request->retval = ret;
                                DBUG("post request %ju\n", allocate_request->rand);
                                sy_rwlock_unlock(&allocate_request->lock);
                        }
                }

                ret = sy_spin_lock(&disk->delete_lock);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                if (!list_empty(&disk->delete_list)) {
                        list_splice_init(&disk->delete_list, &list);
                }

                sy_spin_unlock(&disk->delete_lock);

                __diskmd_delete(disk, &list);
        }

        worker->status = __STATUS_STOPPED__;
        pthread_exit(NULL);
}

static void disk_allocate_worker_exit(disk_worker_t *worker)
{
        worker->status = __STATUS_STOP__;

        sem_post(&worker->arg.sem);

        while (1) {
                if (worker->status == __STATUS_STOPPED__)
                        break;

                DINFO("waiting disk %d allocator_worker exit\n", worker->arg.disk->idx);
                usleep(50 * 1000);
        }

        DWARN("disk %d allocator_worker exited\n", worker->arg.disk->idx);
}

STATIC int __diskmd_allocate_undo(void *_ctx, int retval, void *arg)
{
        int ret;
        diskmd_allocate_ctx_t *ctx = _ctx;

        (void)arg;

        if (retval == 0) {
                for (int i=0; i < ctx->locs_count; i++) {
                        if (ctx->locs[i].valid) {
                                DWARN("i %d delete "LOC_FORMAT"\n", i, LOC_ARG(&ctx->locs[i].loc));
                                ret = diskmd_delete(&ctx->locs[i].loc);
                                if (unlikely(ret)) {
                                        DWARN("ret %d\n", ret);
                                }
                        }
                }
        }

        return 0;
}

STATIC int __diskmd_allocate_request(pool_t *pool, disk_t **disk, int disk_count,
                locs_t *locs, int locs_count)
{
        int ret, i;
        allocate_request_t *allocate_request;

        ret = ymalloc((void **)&allocate_request, sizeof(*allocate_request));
        if (ret)
                GOTO(err_ret, ret);

        allocate_request->locs = locs;
        allocate_request->locs_count = locs_count;

        ret = sy_rwlock_init(&allocate_request->lock, "lock_sem");
        if (ret)
                GOTO(err_free, ret);

        ret = sy_rwlock_wrlock(&allocate_request->lock);
        if (ret)
                GOTO(err_free, ret);

        allocate_request->rand = fastrandom();
        DBUG("request %ju\n", allocate_request->rand);
        ret = sy_spin_lock(&pool->allocator.lock);
        if (ret)
                GOTO(err_lock, ret);

        list_add_tail(&allocate_request->hook, &pool->allocator.list);

        sy_spin_unlock(&pool->allocator.lock);

        // 唤醒所有盘的工作线程，竞争该任务，避免轮询方式的问题
        uint64_t r = fastrandom();
        for (i = 0; i < disk_count; i++) {
                sem_post(disk[(i + r) % disk_count]->sem);
        }

        CORE_ANALYSIS_BEGIN(1);

        //DINFO("lock wait\n");
        DBUG("wait %ju\n", allocate_request->rand);
        ret = sy_rwlock_wrlock(&allocate_request->lock);
        if (ret)
                UNIMPLEMENTED(__DUMP__);

        DBUG("return %ju\n", allocate_request->rand);

        CORE_ANALYSIS_UPDATE(1, 1000 * 1000, "diskmd_getempty");

        sy_rwlock_unlock(&allocate_request->lock);

        ret = allocate_request->retval;
        if (ret) {
                GOTO(err_free, ret);
        }

        yfree((void **)&allocate_request);

        return 0;
err_lock:
        sy_rwlock_unlock(&allocate_request->lock);
err_free:
        yfree((void **)&allocate_request);
err_ret:
        return ret;
}

STATIC void __diskmd_allocate_request__(void *arg)
{
        int ret;
        diskmd_allocate_ctx_t *ctx = arg;

        ret = __diskmd_allocate_request(ctx->pool, ctx->array, ctx->disk_count, ctx->locs, ctx->locs_count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        co_fork_return(ctx->fork_hook.fork, ctx->fork_hook.idx, 0);
        return;
err_ret:
        co_fork_return(ctx->fork_hook.fork, ctx->fork_hook.idx, ret);
}

/**
 * @todo 连续的chunk均匀分布到各个磁盘上
 * @todo 每个磁盘上chunk是随机分配还是顺序分配？
 * @todo 部分任务失败，如何处理？
 */
STATIC int __diskmd_allocate_request_multi(pool_t *pool, disk_t **array, int disk_count, diskloc_t *locs, int locs_count)
{
        int ret, i, left, step, step_in_theory, record_size, request_count;
        co_fork_t *fork;
        diskmd_allocate_ctx_t *ctxs, *ctx;

        left = locs_count;

        // TODO 确定每次提交chunk数
        step_in_theory = 2 * locs_count / disk_count;
        if (step_in_theory == 0) {
                step_in_theory = 1;
        }

        record_size = (sizeof(diskmd_allocate_ctx_t) + sizeof(locs_t) * step_in_theory);
        request_count = locs_count / step_in_theory + (locs_count % step_in_theory ? 1 : 0);

        YASSERT(locs_count > 0 && step_in_theory > 0 && request_count > 0);

        ret = co_fork_create(&fork, __FUNCTION__, request_count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&ctxs,  record_size * request_count);
        if (unlikely(ret))
                GOTO(err_fork, ret);

        i = 0;
        while (left > 0) {
                step = _min(left, step_in_theory);

                DBUG("%d / %d locs_count %d disk_count %d step %d\n",
                                i, request_count, locs_count, disk_count, step);

                ctx = (void *)ctxs + record_size * i;
                memset(ctx, 0x0, sizeof(record_size));

                ctx->pool = pool;
                ctx->array = array;
                ctx->disk_count = disk_count;
                ctx->locs_count = step;
                ctx->loc_idx = 0;

                co_fork_add(fork, __diskmd_allocate_request__, ctx);
                left -= step;
                i++;
        }

        ret = co_fork_join(fork);
        if (unlikely(ret)) {
                // TODO 部分成功, UNDO
                co_fork_iterate(fork, __diskmd_allocate_undo, NULL);
                GOTO(err_free, ret);
        }

        // merge result
        locs_t *loc;
        int loc_idx = 0;

        while (loc_idx < locs_count) {
                for (i = 0; i < request_count; i++) {
                        ctx = (void *)ctxs + record_size * i;
                        if (ctx->loc_idx < ctx->locs_count) {
                                loc = &ctx->locs[ctx->loc_idx];
                                DBUG("%d diskloc "LOC_FORMAT"\n", loc_idx, LOC_ARG(&loc->loc));

                                locs[loc_idx] = loc->loc;

                                ctx->loc_idx++;
                                loc_idx++;
                        }
                }
        }

        YASSERT(loc_idx == locs_count);

        yfree((void **)&ctxs);
        yfree((void **)&fork);
        return 0;
err_free:
        yfree((void **)&ctxs);
err_fork:
        yfree((void **)&fork);
err_ret:
        return ret;
}

STATIC int __diskmd_allocate_request_single(pool_t *pool, disk_t **disk, int disk_count,
                diskloc_t *dloc, int locs_count)
{
        int ret;
        locs_t locs;

        locs.valid = FALSE;

        ret = __diskmd_allocate_request(pool, disk, disk_count, &locs, locs_count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        *dloc = locs.loc;

        return 0;
err_ret:
        return ret;
}

int diskmd_pool_get_empty_with_tier(pool_t *pool, diskloc_t *locs, int locs_count, int tier)
{
        int ret, disk_count = 0, retry = 0;
        disk_t *array[DISK_MAX];

        ret = sy_rwlock_rdlock(&pool->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

retry:
        ret = diskmd_pool_disks_get(pool, tier, locs_count, array, &disk_count);
        if (unlikely(ret))
                GOTO(err_unlock, ret);

        if (disk_count) {
                if (locs_count == 1) {
                        ret = __diskmd_allocate_request_single(pool, array, disk_count, locs, locs_count);
                } else {
                        ret = __diskmd_allocate_request_multi(pool, array, disk_count, locs, locs_count);
                }
                if (unlikely(ret)) {
                        if (ret == ENOSPC || ret == ENODEV) {
                                if (retry < 3) {
                                        retry++;
                                        goto retry;
                                } else
                                        GOTO(err_release, ret);
                        } else
                                GOTO(err_release, ret);
                }
        } else {
                ret = ENOSPC;
                GOTO(err_release, ret);
        }

#if 0
        ret = diskmd_flush_range(loc);
        if (unlikely(ret))
                GOTO(err_release, ret);
#endif

        diskmd_pool_disks_release(pool, array, disk_count);
        sy_rwlock_unlock(&pool->lock);
        return 0;
err_release:
        diskmd_pool_disks_release(pool, array, disk_count);
err_unlock:
        sy_rwlock_unlock(&pool->lock);
err_ret:
        return ret;
}


int diskmd_pool_get_empty(pool_t *pool, diskloc_t *locs, int locs_count, int *_tier)
{
        int ret, min, max, i, retry = 0;

        ANALYSIS_BEGIN(0);

retry:
        min = pool->tier_min;
        max = pool->tier_max;

        for (i = min; i < max + 1; i++) {
                ret = diskmd_pool_get_empty_with_tier(pool, locs, locs_count, i);
                if (unlikely(ret)) {
                        continue;
                }

                *_tier = i;
                break;
        }

        if (i == max + 1) {//disk tier maybe updating
                if (retry < 5) {
                        retry++;
                        schedule_sleep("getempty", 50);
                        goto retry;
                } else {
                        ret = ENOSPC;
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_QUEUE(0, IO_WARN, "diskmd_getempty");

        return 0;
err_ret:
        return ret;
}

void diskmd_pool_dump(pool_t *pool)
{
        int ret, i, wbcount, wbused, wbtotal;
        disk_t *disk;
        char key[MAX_PATH_LEN], value[MAX_BUF_LEN];

        DBUG("pool %s\n", pool->name);

        for (i = 0; i < DISK_MAX; i++) {
                if (pool->disk_array[i] == 0) {
                        continue;
                }

                ret = disk_slot_get(i, &disk);
                if (unlikely(ret))
                        continue;

                wbcount = 0;
                wbused = 0;
                wbtotal = 0;

                snprintf(key, MAX_PATH_LEN, "diskstat/%s/%u.stat", pool->name, i);
                snprintf(value, MAX_BUF_LEN,
                                "disk:%u\n"
                                "online:%u\n"
                                "cached:%u\n"
                                "cache:%u\n"
                                "wbcount:%u\n"
                                "wbused:%u\n"
                                "wbtotal:%u\n"
                                "used:%u\n"
                                "total:%u\n",
                                i,
                                !(disk->status & __DISK_OFFLINE__),
                                disk->cached,
                                disk->cache,
                                wbcount,
                                wbused,
                                wbtotal,
                                disk->bmap.nr_one,
                                disk->bmap.size);

                disk_slot_release(i);

                ret = nodectl_set(key, value);
                if (unlikely(ret)) {
                        DERROR("set %s fail\n", key);
                }

                int len = strlen(value);
                for (int j = 0; j < len; j++) {
                        if (value[j] == '\n')
                                value[j] = ' ';
                }
                
                DBUG("key %s value %s\n", key, value);
        }
}
